package ssm

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}

import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import ssm.OfflineWareHouse.logger

object WareHouseStreaming {
  def main(args:Array[String]): Unit ={
    val sparkConf = new SparkConf().setAppName("WareHouseStreaming").setMaster("local")
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "z9633352")
    prop.put("driver","com.mysql.jdbc.Driver")
    val scc = new StreamingContext(sparkConf, Seconds(10))

    val file = "file:///C:/Users/Lenovo/Desktop/Working/Python/data/OfflineWareHouse/"
    val lineDStream = scc.textFileStream(file)

    val lines = lineDStream.flatMap(_.split("\n"))
    val line = lines.map(_.replaceAll("\"","")).map(_.split("\t"))
    line.foreachRDD(rdd=>{
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      rdd
        .map(x=>(x(0).trim,x(1).trim,x(2).trim,x(3).trim,x(4).trim))
        .toDF("userIp","local_time","url","status","referer")
        .registerTempTable("speedtable")
      val dataFrame = sqlContext.sql("select * from  speedtable ")
      //dataFrame.show(5)
      //println(dataFrame.count())

      val d1 = dataFrame
        .filter(x => x(3) == "200")
        .filter(x => x(4) != "-")

      val dfDetail = d1.map( row =>{
        val urls = row.getAs[String]("url").split(" ")
        val referer = row.getAs[String]("referer").split("\\?")
        val SE = referer(0)
        val names = referer(1)
          .split("=")
        val name = names(1)
        var map = Map("params" -> "null")
        val method = urls(0)
        val url = urls(1)
        val agreement = urls(2)
        (
          row.getAs[String]("userIp"),
          row.getAs[String]("local_time"),
          map.getOrElse("method",method),
          map.getOrElse("url",url),
          map.getOrElse("agreement",agreement),
          row.getAs[String]("status"),
          map.getOrElse("SE",SE),
          map.getOrElse("name",name)
        )
      }).toDF("userIp","local_time","method","url","agreement","status","SE","name")
      dfDetail.show(5)

      val url = "jdbc:mysql://localhost:3306/python_db"
      println("开始写入数据库")
      dfDetail.write.mode("Append").jdbc(url,"warehouse",prop)
      println("完成写入数据库,新增"+dfDetail.count()+"条数据")
    })

    scc.start()
    scc.awaitTermination()
  }
}
